package v0

import (
	"fmt"
	"gitee.com/ymofen/gobase"
	"sort"
	"strings"
	"sync"
)

/*
支持使用路由进行订阅 # 匹配下级所有， +匹配当前层
topic/news

10000 topic

pub:
sync.map
lk:
	utils_subscribe_v2._test.go:168: 25505, 5094/s
	utils_subscribe_v2._test.go:168: 50409, 5035/s

nolk:
	utils_subscribe_v2._test.go:168: 25767, 5135/s
	utils_subscribe_v2._test.go:168: 51914, 5175/s

map with lk
		utils_subscribe_v2._test.go:168: 34228, 6844/s
		utils_subscribe_v2._test.go:168: 68952, 6894/s

map with lk
	    utils_subscribe_v2._test.go:168: 35343, 6977/s
	    utils_subscribe_v2._test.go:168: 71194, 7072/s

sesslst -> map
    utils_subscribe_v2._test.go:168: 36668, 7321/s
    utils_subscribe_v2._test.go:168: 74022, 7383/s
*/

type SubFunc = func(id, topic string, args ...interface{}) (ok bool)

type ISub interface {
	Sub(id, topic string, cb func(id, topic string, args ...interface{}) (ok bool))
}

type IUnsub interface {
	Unsub(id, topic string) bool
}

type IPub interface {
	Pub(topic string, max int, args ...interface{}) int
}

// 订阅中心,
type Subscribe struct {
	lk             sync.RWMutex
	topiclst       map[string]*SubTopicItem
	routeParseFunc func(topic string) interface{}                  // 必须确保有值
	routeMatchFunc func(route interface{}, topic interface{}) bool // 必须确保有值
}

func innerParseRoute(route string) interface{} {
	return strings.Split(route, "/")
}

func innerRouteIncludesTopic0(route []string, topic []string) bool {
	if len(route) == 0 {
		return len(topic) == 0
	}

	if len(topic) == 0 {
		return route[0] == "#"
	}

	if route[0] == "#" {
		return true
	}

	if (route[0] == "+") || (route[0] == topic[0]) {
		return innerRouteIncludesTopic0(route[1:], topic[1:])
	}
	return false
}

/*
1000W:consume:730(ms)
*/
func innerRouteIncludesTopic(route interface{}, topic interface{}) bool {
	return innerRouteIncludesTopic0(route.([]string), topic.([]string))
}

func NewSubscribe() *Subscribe {
	return &Subscribe{
		topiclst:       make(map[string]*SubTopicItem),
		routeParseFunc: innerParseRoute,
		routeMatchFunc: innerRouteIncludesTopic,
	}
}

func NewSubscribeEx(routeParseFunc func(topic string) interface{}, routeMatchFunc func(route interface{}, topic interface{}) bool) *Subscribe {
	if routeParseFunc == nil {
		panic("invalid routeParseFunc")
	}
	if routeMatchFunc == nil {
		panic("invalid routeMatch")
	}
	return &Subscribe{
		topiclst:       make(map[string]*SubTopicItem),
		routeParseFunc: routeParseFunc,
		routeMatchFunc: routeMatchFunc,
	}
}

type SubTopicItem struct {
	subtopicid string
	subtopic   interface{}
	sesslst    map[string]SubFunc
}

func (this *Subscribe) Close() error {
	this.lk.Lock()
	defer this.lk.Unlock()
	for k, _ := range this.topiclst {
		delete(this.topiclst, k)
	}
	return nil
}

func (this *Subscribe) matchTopic(topics interface{}, fn func(itm *SubTopicItem) bool) {
	this.rangeTopics(func(key string, itm *SubTopicItem) bool {
		if this.routeMatchFunc(itm.subtopic, topics) {
			return fn(itm)
		}
		return true
	})
}

func (this *Subscribe) checkGetTopic(topic string, new bool) *SubTopicItem {
	itm := this.topiclst[topic]
	if new && itm == nil {
		itm = &SubTopicItem{subtopicid: topic, subtopic: this.routeParseFunc(topic), sesslst: make(map[string]SubFunc)}
		this.topiclst[topic] = itm

	}
	return itm
}

func (this *Subscribe) Status() string {
	return fmt.Sprintf("topic-n:%d, sess:%d", this.TopicCount(), this.Count())
}

func (this *Subscribe) GetTopicSubCount(topic string) int {
	this.lk.RLock()
	defer this.lk.RUnlock()
	itm := this.checkGetTopic(topic, false)
	if itm == nil {
		return 0
	}
	return len(itm.sesslst)
}

func (this *Subscribe) TopicCount() int {
	this.lk.RLock()
	defer this.lk.RUnlock()
	return len(this.topiclst)
}

func (this *Subscribe) rangeTopics(fn func(key string, itm *SubTopicItem) bool) {
	for k, v := range this.topiclst {
		if !fn(k, v) {
			break
		}
	}
}

func (this *Subscribe) Count() int {
	this.lk.RLock()
	defer this.lk.RUnlock()
	n := 0
	this.rangeTopics(func(key string, itm *SubTopicItem) bool {
		n += len(itm.sesslst)
		return true
	})
	return n
}

func (this *Subscribe) StatusDetail() string {
	var sb gobase.BytesBuilder
	sb.Appendf("topic:%d", this.TopicCount())
	lst := make([]string, 0, 1024)
	this.rangeTopics(func(key string, itm *SubTopicItem) bool {
		lst = append(lst, key)
		return true
	})
	sort.Strings(lst)
	for i := 0; i < len(lst); i++ {
		itm := this.checkGetTopic(lst[i], false)
		if itm != nil {
			sb.Appendf(",%s:%d", lst[i], len(itm.sesslst))
		}
	}
	return sb.String()
}

// 订阅一个主题
//
// topic订阅主题,为空不进行订阅
// id订阅者id,topic 下id重复将会被覆盖(之前订阅失效)
func (this *Subscribe) Sub(id, topic string, cb func(id, topic string, args ...interface{}) (ok bool)) {
	if len(topic) == 0 {
		return
	}
	this.lk.Lock()
	defer this.lk.Unlock()
	itm := this.checkGetTopic(topic, true)
	itm.sesslst[id] = cb
	return
}

// 取消订阅
// id 订阅时传入的id
// topic订阅的主题
func (this *Subscribe) Unsub(id, topic string) bool {
	if len(topic) == 0 {
		return false
	}
	this.lk.Lock()
	defer this.lk.Unlock()
	itm := this.checkGetTopic(topic, false)
	if itm != nil {
		delete(itm.sesslst, id)
		if len(itm.sesslst) == 0 {
			delete(this.topiclst, topic)
		}
		return true
	}
	return false
}

/*
max:0, 全部投递
>1 投递成功max次后停止
*/
//func (this *Subscribe) Pub0(topic string, max int, args ...interface{}) int {
//	n := 0
//	chkmap := make(map[interface{}]interface{})
//	topics := ParseRoute(topic)
//	this.lk.RLock()
//	this.matchTopic(topics, func(itm *SubTopicItem) bool {
//		itm.sesslst.Range(func(key, value interface{}) bool {
//			if chkmap[key] == nil {
//				chkmap[key] = value
//			}
//			return true
//		})
//		return true
//	})
//	this.lk.RUnlock()
//
//	for _, value := range chkmap {
//		if fn, ok := value.(func(args ...interface{}) bool); ok {
//			if fn(args...) {
//				n++
//			}
//		}
//	}
//	return n
//}

/*
max:0, 全部投递
>1 投递成功max次后停止
循环map

	utils_subscribe_v2._test.go:168: 34228, 6844/s
	utils_subscribe_v2._test.go:168: 68952, 6894/s

循环lst

	utils_subscribe_v2._test.go:168: 35343, 6977/s
	utils_subscribe_v2._test.go:168: 71194, 7072/s
*/
//向主题订阅者推送数据
//topic推送的主题
//max最大接收者,超过该值不再进行推送
func (this *Subscribe) Pub(topic string, max int, args ...interface{}) int {
	n := 0
	chkmap := make(map[interface{}]int8)
	fnlst := make([]SubFunc, 0, 1024)
	idlst := make([]string, 0, 1024)
	topics := this.routeParseFunc(topic)
	this.lk.RLock()
	this.matchTopic(topics, func(itm *SubTopicItem) bool {
		for key, value := range itm.sesslst {
			if chkmap[key] == 0 {
				fnlst = append(fnlst, value)
				idlst = append(idlst, key)
				chkmap[key] = 1
			}
		}
		return true
	})
	this.lk.RUnlock()

	for idx, fn := range fnlst {
		if fn(idlst[idx], topic, args...) {
			n++
			if max > 0 && n >= max {
				break
			}
		}
	}
	return n
}
